// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/stream_pipeline_driver.h"

#include "common/statusor.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/pipeline/scan/morsel.h"
#include "exec/workgroup/work_group.h"
#include "exec/workgroup/work_group_fwd.h"

namespace starrocks::pipeline {

StatusOr<DriverState> StreamPipelineDriver::process(RuntimeState* runtime_state, int worker_id) {
    COUNTER_UPDATE(_schedule_counter, 1);
    SCOPED_TIMER(_active_timer);
    QUERY_TRACE_SCOPED("process", _driver_name);
    set_driver_state(DriverState::RUNNING);
    size_t total_chunks_moved = 0;
    size_t total_rows_moved = 0;
    int64_t time_spent = 0;
    Status return_status = Status::OK();
    StatusOr<DriverState> resturn_state = Status::OK();
    DeferOp defer([&]() {
        if (return_status.ok()) {
            _update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
        }
    });

    bool should_yield = false;
    size_t num_chunks_moved = 0;
    size_t num_operators = _operators.size();
    size_t new_first_epoch_unfinished = _first_epoch_unfinished;
    while (true) {
        RETURN_IF_LIMIT_EXCEEDED(runtime_state, "Pipeline");

        // `finished` is only used to stop the MV maintenance and should be
        // runned without blocking because there is no data processing.
        if (UNLIKELY(_operators[_first_unfinished]->is_finished())) {
            return _handle_finish_operators(runtime_state);
        }

        for (size_t i = _first_epoch_unfinished; i < num_operators - 1; ++i) {
            {
                SCOPED_RAW_TIMER(&time_spent);
                auto& curr_op = _operators[i];
                auto& next_op = _operators[i + 1];

                // Check curr_op epoch_finished firstly
                if (curr_op->is_epoch_finished()) {
                    if (i == 0) {
                        // For source operators
                        RETURN_IF_ERROR(return_status = _mark_operator_epoch_finishing(curr_op, runtime_state));
                    }
                    RETURN_IF_ERROR(return_status = _mark_operator_epoch_finishing(next_op, runtime_state));
                    new_first_epoch_unfinished = i + 1;
                    continue;
                }

                // try successive operator pairs
                if (!curr_op->has_output() || !next_op->need_input()) {
                    continue;
                }

                if (_check_fragment_is_canceled(runtime_state)) {
                    return _state;
                }

                // pull chunk from current operator and push the chunk onto next
                // operator
                StatusOr<ChunkPtr> maybe_chunk;
                {
                    SCOPED_TIMER(curr_op->_pull_timer);
                    QUERY_TRACE_SCOPED(curr_op->get_name(), "pull_chunk");
                    maybe_chunk = curr_op->pull_chunk(runtime_state);
                }
                return_status = maybe_chunk.status();
                if (!return_status.ok() && !return_status.is_end_of_file()) {
                    LOG(WARNING) << "pull_chunk returns not ok status " << return_status.to_string();
                    return return_status;
                }

                if (_check_fragment_is_canceled(runtime_state)) {
                    return _state;
                }

                if (return_status.ok() && maybe_chunk.value()) {
                    auto chunk_value = maybe_chunk.value();
                    if (chunk_value->num_rows() > 0) {
                        size_t row_num = chunk_value->num_rows();
                        total_rows_moved += row_num;
                        {
                            SCOPED_TIMER(next_op->_push_timer);
                            QUERY_TRACE_SCOPED(next_op->get_name(), "push_chunk");
                            return_status = next_op->push_chunk(runtime_state, chunk_value);
                        }
                        // ignore empty chunk generated by per-tablet computation when query cache enabled
                        COUNTER_UPDATE(curr_op->_pull_row_num_counter, row_num);
                        COUNTER_UPDATE(curr_op->_pull_chunk_num_counter, 1);
                        COUNTER_UPDATE(next_op->_push_chunk_num_counter, 1);
                        COUNTER_UPDATE(next_op->_push_row_num_counter, row_num);
                    }
                    num_chunks_moved += 1;
                    total_chunks_moved += 1;
                }

                if (curr_op->is_epoch_finished()) {
                    new_first_epoch_unfinished += 1;
                    if (i == 0) {
                        RETURN_IF_ERROR(return_status = _mark_operator_epoch_finishing(curr_op, runtime_state));
                    }
                    RETURN_IF_ERROR(return_status = _mark_operator_epoch_finishing(next_op, runtime_state));
                    continue;
                }
            }

            // yield when total chunks moved or time spent on-core for evaluation
            // exceed the designated thresholds.
            if (time_spent >= YIELD_MAX_TIME_SPENT_NS) {
                should_yield = true;
                COUNTER_UPDATE(_yield_by_time_limit_counter, 1);
                break;
            }
            if (_workgroup != nullptr && time_spent >= YIELD_PREEMPT_MAX_TIME_SPENT_NS &&
                _workgroup->driver_sched_entity()->in_queue()->should_yield(this, time_spent)) {
                should_yield = true;
                COUNTER_UPDATE(_yield_by_preempt_counter, 1);
                break;
            }
        }

        // close finished operators and update _first_unfinished index
        for (auto i = _first_epoch_unfinished; i < new_first_epoch_unfinished; ++i) {
            RETURN_IF_ERROR(return_status = _mark_operator_epoch_finished(_operators[i], runtime_state));
        }
        _first_epoch_unfinished = new_first_epoch_unfinished;
        if (sink_operator()->is_epoch_finished()) {
            VLOG_ROW << "Driver epoch finished, driver=" << this->to_readable_string();
            RETURN_IF_ERROR(return_status = _mark_operator_epoch_finished(_operators.back(), runtime_state));
            set_driver_state(is_still_epoch_finishing() ? DriverState::EPOCH_PENDING_FINISH
                                                        : DriverState::EPOCH_FINISH);
            return _state;
        }

        // no chunk moved in current round means that the driver is blocked.
        // should yield means that the CPU core is occupied the driver for a
        // very long time so that the driver should switch off the core and
        // give chance for another ready driver to run.
        if (num_chunks_moved == 0 || should_yield) {
            if (is_precondition_block()) {
                set_driver_state(DriverState::PRECONDITION_BLOCK);
                COUNTER_UPDATE(_block_by_precondition_counter, 1);
            } else if (!sink_operator()->is_finished() && !sink_operator()->need_input()) {
                set_driver_state(DriverState::OUTPUT_FULL);
                COUNTER_UPDATE(_block_by_output_full_counter, 1);
            } else if (!source_operator()->is_finished() && !source_operator()->has_output()) {
                set_driver_state(DriverState::INPUT_EMPTY);
                COUNTER_UPDATE(_block_by_input_empty_counter, 1);
            } else {
                set_driver_state(DriverState::READY);
            }
            return _state;
        }
    }
}

StatusOr<DriverState> StreamPipelineDriver::_handle_finish_operators(RuntimeState* runtime_state) {
    size_t total_chunks_moved = 0;
    size_t total_rows_moved = 0;
    int64_t time_spent = 0;
    Status return_status = Status::OK();
    StatusOr<DriverState> resturn_state = Status::OK();
    DeferOp defer([&]() {
        if (return_status.ok()) {
            _update_statistics(runtime_state, total_chunks_moved, total_rows_moved, time_spent);
        }
    });

    // handle finished
    size_t num_operators = _operators.size();
    size_t new_first_unfinished = _first_unfinished;
    for (size_t i = _first_unfinished; i < num_operators - 1; ++i) {
        {
            SCOPED_RAW_TIMER(&time_spent);
            auto& curr_op = _operators[i];
            auto& next_op = _operators[i + 1];

            // Check curr_op finished firstly
            if (curr_op->is_finished()) {
                if (i == 0) {
                    // For source operators
                    RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
                }
                RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
                new_first_unfinished = i + 1;
                continue;
            }

            // try successive operator pairs
            if (!curr_op->has_output() || !next_op->need_input()) {
                continue;
            }

            if (_check_fragment_is_canceled(runtime_state)) {
                return _state;
            }

            // pull chunk from current operator and push the chunk onto next
            // operator
            StatusOr<ChunkPtr> maybe_chunk;
            {
                SCOPED_TIMER(curr_op->_pull_timer);
                QUERY_TRACE_SCOPED(curr_op->get_name(), "pull_chunk");
                maybe_chunk = curr_op->pull_chunk(runtime_state);
            }
            return_status = maybe_chunk.status();
            if (!return_status.ok() && !return_status.is_end_of_file()) {
                LOG(WARNING) << "pull_chunk returns not ok status " << return_status.to_string();
                return return_status;
            }

            if (_check_fragment_is_canceled(runtime_state)) {
                return _state;
            }

            if (return_status.ok() && maybe_chunk.value()) {
                auto chunk_value = maybe_chunk.value();
                if (chunk_value->num_rows() > 0) {
                    size_t row_num = chunk_value->num_rows();
                    total_rows_moved += row_num;
                    {
                        SCOPED_TIMER(next_op->_push_timer);
                        QUERY_TRACE_SCOPED(next_op->get_name(), "push_chunk");
                        return_status = next_op->push_chunk(runtime_state, chunk_value);
                    }
                    // ignore empty chunk generated by per-tablet computation when query cache enabled
                    COUNTER_UPDATE(curr_op->_pull_row_num_counter, row_num);
                    COUNTER_UPDATE(curr_op->_pull_chunk_num_counter, 1);
                    COUNTER_UPDATE(next_op->_push_chunk_num_counter, 1);
                    COUNTER_UPDATE(next_op->_push_row_num_counter, row_num);
                }
                total_chunks_moved += 1;
            }

            // Check curr_op finished again
            if (curr_op->is_finished()) {
                // TODO: need add control flag
                if (i == 0) {
                    // For source operators
                    RETURN_IF_ERROR(return_status = _mark_operator_finishing(curr_op, runtime_state));
                }
                RETURN_IF_ERROR(return_status = _mark_operator_finishing(next_op, runtime_state));
                new_first_unfinished = i + 1;
                continue;
            }
        }
    }

    // close finished operators and update _first_unfinished index
    for (auto i = _first_unfinished; i < new_first_unfinished; ++i) {
        RETURN_IF_ERROR(return_status = _mark_operator_finished(_operators[i], runtime_state));
    }
    _first_unfinished = new_first_unfinished;
    if (sink_operator()->is_finished()) {
        finish_operators(runtime_state);
        set_driver_state(is_still_pending_finish() ? DriverState::PENDING_FINISH : DriverState::FINISH);
        return _state;
    }
    return _state;
}

Status StreamPipelineDriver::_mark_operator_epoch_finishing(OperatorPtr& op, RuntimeState* state) {
    auto& op_state = _operator_stages[op->get_id()];
    if (op_state >= OperatorStage::EPOCH_FINISHING) {
        return Status::OK();
    }

    VLOG_ROW << strings::Substitute("[Driver] epoch_finishing operator [fragment_id=$0] [driver=$1] [operator=$2]",
                                    print_id(state->fragment_instance_id()), to_readable_string(), op->get_name());
    {
        SCOPED_TIMER(op->_finishing_timer);
        op_state = OperatorStage::EPOCH_FINISHING;
        QUERY_TRACE_SCOPED(op->get_name(), "set_epoch_finishing");
        return op->set_epoch_finishing(state);
    }
}

Status StreamPipelineDriver::_mark_operator_epoch_finished(OperatorPtr& op, RuntimeState* state) {
    auto& op_state = _operator_stages[op->get_id()];
    if (op_state >= OperatorStage::EPOCH_FINISHED) {
        return Status::OK();
    }

    VLOG_ROW << strings::Substitute("[Driver] epoch_finished operator [fragment_id=$0] [driver=$1] [operator=$2]",
                                    print_id(state->fragment_instance_id()), to_readable_string(), op->get_name());
    {
        SCOPED_TIMER(op->_finishing_timer);
        op_state = OperatorStage::EPOCH_FINISHED;
        QUERY_TRACE_SCOPED(op->get_name(), "set_epoch_finished");
        return op->set_epoch_finished(state);
    }
}

Status StreamPipelineDriver::reset_epoch(RuntimeState* runtime_state) {
    _first_epoch_unfinished = 0;
    for (auto& op : _operators) {
        RETURN_IF_ERROR(op->reset_epoch(runtime_state));
        _operator_stages[op->get_id()] = OperatorStage::PREPARED;
    }
    return Status::OK();
}

void StreamPipelineDriver::epoch_finalize(RuntimeState* runtime_state, DriverState state) {
    int64_t time_spent = 0;
    DeferOp defer([this, &time_spent]() {
        _update_driver_acct(0, 0, time_spent);
        _in_queue->update_statistics(this);
    });
    SCOPED_RAW_TIMER(&time_spent);

    VLOG_ROW << "[Driver] epoch_finalize, driver=" << this->to_readable_string();
    DCHECK(state == DriverState::EPOCH_FINISH);
    _pipeline->count_down_epoch_finished_driver(runtime_state);
}

} // namespace starrocks::pipeline
